{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 303 - Transfer Learning by DNN Featurization\n",
    "\n",
    "Classify automobile vs airplane using DNN featurization and transfer learning\n",
    "against a subset of images from CIFAR-10 dataset."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we load first batch of CIFAR-10 training data into NumPy array."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mmlspark import CNTKModel, ModelDownloader\n",
    "import numpy as np, pandas as pd\n",
    "import os, urllib, tarfile, pickle, array\n",
    "from os.path import abspath\n",
    "from pyspark.sql.functions import col, udf\n",
    "from pyspark.sql.types import *\n",
    "\n",
    "cdnURL = \"https://mmlspark.azureedge.net/datasets\"\n",
    "\n",
    "# Please note that this is a copy of the CIFAR10 dataset originally found here:\n",
    "# http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz\n",
    "dataFile = \"cifar-10-python.tar.gz\"\n",
    "dataURL = cdnURL + \"/CIFAR10/\" + dataFile\n",
    "\n",
    "if not os.path.isfile(dataFile):\n",
    "    urllib.request.urlretrieve(dataURL, dataFile)\n",
    "with tarfile.open(dataFile, \"r:gz\") as f:\n",
    "    train_dict = pickle.load(f.extractfile(\"cifar-10-batches-py/data_batch_1\"),\n",
    "                             encoding=\"latin1\")\n",
    "\n",
    "train_data = np.array(train_dict[\"data\"])\n",
    "train_labels = np.array(train_dict[\"labels\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Load DNN Model and pick one of the inner layers as feature output"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "modelName = \"ConvNet\"\n",
    "modelDir = \"wasb:///models/\"\n",
    "modelDir = \"file:\" + abspath(\"models\")\n",
    "d = ModelDownloader(spark, modelDir)\n",
    "model = d.downloadByName(modelName)\n",
    "print(model.layerNames)\n",
    "cntkModel = CNTKModel().setInputCol(\"images\").setOutputCol(\"features\") \\\n",
    "                       .setModelLocation(spark, model.uri).setOutputNodeName(\"l8\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Format raw CIFAR data into correct shape."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def reshape_image(record):\n",
    "    image, label = record\n",
    "    data = [float(x) for x in image.reshape(3,32,32).flatten()]\n",
    "    return data, int(label)\n",
    "\n",
    "convert_to_float = udf(lambda x: x, ArrayType(FloatType()))\n",
    "\n",
    "image_rdd = zip(train_data,train_labels)\n",
    "image_rdd = spark.sparkContext.parallelize(image_rdd).map(reshape_image)\n",
    "\n",
    "imagesWithLabels = image_rdd.toDF([\"images\", \"labels\"])\n",
    "imagesWithLabels = imagesWithLabels.withColumn(\"images\", convert_to_float(col(\"images\")))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Select airplanes (label=0) and automobiles (label=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "imagesWithLabels = imagesWithLabels.filter(\"labels<2\")\n",
    "imagesWithLabels.cache()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Featurize images"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "featurizedImages = cntkModel.transform(imagesWithLabels).select([\"features\",\"labels\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use featurized images to train a classifier"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mmlspark import TrainClassifier\n",
    "from pyspark.ml.classification import RandomForestClassifier\n",
    "\n",
    "train,test = featurizedImages.randomSplit([0.75,0.25])\n",
    "\n",
    "model = TrainClassifier(model=RandomForestClassifier(),labelCol=\"labels\").fit(train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Evaluate the accuracy of the model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mmlspark import ComputeModelStatistics\n",
    "predictions = model.transform(test)\n",
    "metrics = ComputeModelStatistics(evaluationMetric=\"accuracy\").transform(predictions)\n",
    "metrics.show()"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
